-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add configurable pubsub package so we can increase the number of publishers on topics #261
Conversation
I dont really like that, this is kind of taking away the benefit of using a generic package like gocloud.dev/pubsub . Is there really no other way? |
Not ready for review |
"gocloud.dev/pubsub/gcppubsub" | ||
) | ||
|
||
const Scheme = "configurable-gcppubsub" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very long name for a scheme 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha I shorterned it to tm-gcppubsub. I wouldn't be against changing the package name to this also but the name is pretty descriptive.
pc, err := gcppubsub.PublisherClient(ctx, conn) | ||
if err != nil { | ||
return nil, err | ||
} | ||
topicPath := path.Join(u.Host, u.Path) | ||
if topicPathRE.MatchString(topicPath) { | ||
return gcppubsub.OpenTopicByPath(pc, topicPath, &opts) | ||
} | ||
// Shortened form? | ||
topicName := strings.TrimPrefix(u.Path, "/") | ||
return gcppubsub.OpenTopic(pc, gcp.ProjectID(u.Host), topicName, &opts), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using gcppubsub.URLOpener
here? You initialise it with the connection and topic options and then just call OpenTopicURL with the URL. That way we can still benefit from their implementation and their extra params.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I didn't go for this is because I did want to play around with the url passed into the service. We would take a url, get the params from it, reconstruct the url, modify the schema and then pass it to the URL opener. If the schema changes then this function would need to change. We still get the default gcp opts doing it this way. What do you think?
Are we truly sure this is the issue? It seems strange that we're really hitting the limits of publish rate - how many messages are we pushing to pub/sub? |
I just realised as we publish with an ordering key we want a batch size of 1. Given this we can increase the number of handlers within the server application code so I'm going to change this! |
thought-machine#262) * Group pubsub opts for api server and add metrics related to failing to publish messages
* Add rate limiting bucket to skip Redis if we have errored often * Puku format * Add x/time to go.mod * Add limit in more locations * Don't mess up the go.mod * Remove the limit on the write * Avoid any leak of the digest proto * Bump version
thought-machine#262) * Group pubsub opts for api server and add metrics related to failing to publish messages
* Randomly offset retention and expiration time.
* Don't use Allow, because that consumes tokens * Check for tokens existing * Version + changelog --------- Co-authored-by: Alexander Garbett <[email protected]>
…hought-machine#269) * Receive from queue after getting list of jobs.
…#274) * Add GCP Cloud profiler support * Undo a couple of Puku changes * Update Go and all packages to latest * Attempt to update to later versions of Go * Missed the dockerfile * And latest Docker tags as well * Try removing systemtoolchain? * Revert to older image * Attempt to calm the linter * Actually revert the SDK override * Attempt to get around the linter? * Bump golang CI? * Fix defer for timer * Attempt go.mod surgery * Silence linter, add
* Bump to 11.7.0 * Add changelog --------- Co-authored-by: Alexander Garbett <[email protected]>
* Update remote-apis-sdk to right commit * Bump version and changelog * Update go.sum as well --------- Co-authored-by: Alexander Garbett <[email protected]>
* Set lowercase names for the profiler to work. * Bump versions as well --------- Co-authored-by: Alexander Garbett <[email protected]>
* Increase timeout for UploadIfMissing * bump timeout to 10 minutes
* periodically delete jobs in Mettle api server * don't defer mutex unlock * Lock whole map when deleting * Add metric * Version + changelog * Remove newline * lock/unlock mutex correctly * Observe time in correct place
* Refactor gauge strategy to use function
* Reduce verbosity of api server logs * Revert stream back to warning
* Reduce verbosity of worker logs * Remove more log lines, and make sure errors contain hash
* Make platform details optional * Put the validate check in a better place --------- Co-authored-by: Alexander Garbett <[email protected]>
* Change update logic to match previous behaviour, update time in a few locations. * Version + Changelog --------- Co-authored-by: Hamish Pitkeathly <[email protected]>
Got myself in a right rebase mess here so abandoning |
The number of handlers used by a topic is not configurable using the gcppubsub package. We are often seeing context deadline exceeded errors when publishing to the response queue at times when throughput is high. This error is client side and little to no deadline exceeded errors are seen on the gcp side.
This revision adds a configurable pubsub package so we configure both the max handlers and the batch size. We will now have to pass the topic url with the prefix configureable- if we want to increase the max handlers.